#!/usr/bin/python3
# -*- coding: utf-8 -*-

import logging

from kafka import KafkaConsumer
from kafka import KafkaProducer
from kafka.errors import KafkaError

KAFAKA_TOPIC = "3RD_RIDER_COORDINATES_COUNT"
KAFAKA_HOST_PRODUCTER = "10.218.220.90"
KAFAKA_HOST_CONSUMER = "172.20.54.15"

KAFAKA_PORT = 9092

logging.basicConfig(
    level=logging.INFO,  # 定义输出到文件的log级别，大于此级别的都被输出
    # format='%(asctime)s  %(filename)s %(levelno)s : %(levelname)s %(message)s',  # 定义输出log的格式
    format='%(asctime)s : %(message)s',  # 定义输出log的格式
    datefmt='%Y-%m-%d %A %H:%M:%S',  # 时间
    filename='obs_info_rider.log',  # log文件名
    filemode='w')  # 写入模式“w”或“a”


class Kafka_producer():
    '''''
    生产模块：根据不同的key，区分消息
    '''

    def __init__(self, kafkahost, kafkaport, kafkatopic):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        print("producer:h,p,t", kafkahost, kafkaport, kafkatopic)
        bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort
        )
        print("boot svr:", bootstrap_servers)
        self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

    def send(self, k, v):
        try:
            producer = self.producer

            k = k.encode('utf-8')
            resp = producer.send(self.kafkatopic, key=k, value=v)
            # print(resp.succeeded())
            producer.flush()
        except KafkaError as e:
            print(e)


class Kafka_consumer():
    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.consumer = KafkaConsumer(self.kafkatopic, group_id=self.groupid,
                                      bootstrap_servers='{kafka_host}:{kafka_port}'.format(
                                          kafka_host=self.kafkaHost,
                                          kafka_port=self.kafkaPort,
                                          auto_offset_reset='earliest')
                                      )

    def consume_data(self):
        try:
            for message in self.consumer:
                yield message
        except KeyboardInterrupt as e:
            print(e)


if __name__ == '__main__':
    group = 'test'
    consumer = Kafka_consumer(KAFAKA_HOST_CONSUMER, KAFAKA_PORT, KAFAKA_TOPIC, group)
    producer = Kafka_producer(KAFAKA_HOST_PRODUCTER, KAFAKA_PORT, KAFAKA_TOPIC)
    message = consumer.consume_data()
    pre_time = 0

    for msg in message:
        producer.send('1', msg.value)
        print(str(msg.value, 'utf-8)'))